package org.lisy.distributed.lock.zookeeper;

import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * LeaderSelector选举原理
 * 利用 Curator 中 InterProcessMutex 分布式锁进行抢主，抢到锁的即为 Leader
 * 节点在加入选举以后，除非 takeLeadership 程序结束或者 close()退出选举，否则加点自加入选举以后将持续持有或者保持对主节点的竞争
 * 已经执行完的主节点再次加入主节点选举，需要调用 autoRequeue()方法去自动加入
 * 
 * @author lisy
 */
public class LeaderSelectorLock implements LeaderSelectorListener {
	
	CuratorFramework client = null;
	LeaderSelector selector = null;
	boolean connectionOk = false;
	boolean isLeader = false;
	int count = 0;
	
	public static void main(String[] args) {
		LeaderSelectorLock main = new LeaderSelectorLock();
		main.init();
		
		while (true) {
			if (!main.connectionOk) {
				System.out.println(new Timestamp(System.currentTimeMillis()) + " not connectionOk ! ");
				try {
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				continue;
			}
			if (main.count == 1) {
				System.out.println(new Timestamp(System.currentTimeMillis()) + " lock release, add autoRequeue ! ");
				main.selector.autoRequeue();
				main.count = 0;
				try {
					TimeUnit.SECONDS.sleep(5);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			} else {
				if (main.isLeader) {
					System.out.println(new Timestamp(System.currentTimeMillis()) + " lock effective ! ");
				} else {
					System.out.println(new Timestamp(System.currentTimeMillis()) + " not leader ! ");
				}
				try {
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public void init() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
		client.getConnectionStateListenable().addListener(this);
		client.start();
		selector = new LeaderSelector(client, "/distributed/lock/leader", this);
		selector.autoRequeue();  
		selector.start();
	}
	
	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		System.out.println(new Timestamp(System.currentTimeMillis()) + " access stateChanged :" + newState);
		if (ConnectionState.CONNECTED.equals(newState) || ConnectionState.RECONNECTED.equals(newState)) {
			connectionOk = true;
		} else {
			connectionOk = false;
		}
	}

	@Override
	public void takeLeadership(CuratorFramework client) throws Exception {
		System.out.println(new Timestamp(System.currentTimeMillis()) + " takeLeadership start ");
		isLeader = true;
		TimeUnit.SECONDS.sleep(10);
		count = 1;
		isLeader = false;
		System.out.println(new Timestamp(System.currentTimeMillis()) + " takeLeadership end ");
	}

}
